Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ordering of first last aggregation inside aggregator #8662

Merged
merged 10 commits into from
Dec 28, 2023
Merged

Handle ordering of first last aggregation inside aggregator #8662

merged 10 commits into from
Dec 28, 2023

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Dec 27, 2023

Which issue does this PR close?

Closes #.
Improves situation on #8662
Related to #8582

Rationale for this change

This PR implements the observation by @alamb at the PR that for first and last value aggregation we do not need to sort entire data at its input.

In other words, This PR is the FIRST_VALUE and LAST_VALUE aggregation support of the approach 3 in the design document

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Dec 27, 2023
@@ -78,7 +78,7 @@ c 4
query I
SELECT DISTINCT ON (c1) c2 FROM aggregate_test_100 ORDER BY c1, c3;
----
5
4
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran same query in the postgre, it gave the same result with the new version.

@mustafasrepo mustafasrepo marked this pull request as draft December 27, 2023 11:53
// Append ordering requirements to expressions' results.
// This way order sensitive aggregators can satisfy requirement
// themselves.
if let Some(ordering_req) = agg.order_bys() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since aggregators themselves handle ordering. We append ordering expression values to the field also for all modes.

}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a min max alternative to this we can use that one also. However, as far as I know there is no util for this support. Maybe @tustvold can answer this, if he is familiar with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of a min/max kernel that returns the ordinal position of the min/max

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I had the same basic need (find the position of min/max so I could find a value in a corresponding column) while implementing our special selector_first, selector_last, etc functions in InfluxDB 3.0 (I also had to code them specially)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you implementation is more efficient? If that is the case, maybe we can use that code instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here

https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs

And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127

I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time

@mustafasrepo mustafasrepo marked this pull request as ready for review December 27, 2023 12:22
----------------------CoalesceBatchesExec: target_batch_size=8192
------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1
--------------------------MemoryExec: partitions=1, partition_sizes=[3]
------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since first_value and last_value no longer requires ordering at its input. SortExecs are removed from the plan.

@@ -2209,7 +2208,7 @@ ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c)
----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]

query III
SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
SELECT a, b, LAST_VALUE(c ORDER BY a DESC, c ASC) as last_c
Copy link
Contributor Author

@mustafasrepo mustafasrepo Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of this test was not unique according to specifications (Since column a is not unique). I changed test to make result unique.

// - There is a more recent entry in terms of requirement
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you are aware but https://docs.rs/arrow-row/latest/arrow_row/ will be a much faster way to perform row-based comparisons than relying on ScalarValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, however, here we are checking just a single row (row that have lowest value). Hence I don't think it is worth to conversion here.

Copy link
Contributor

@alamb alamb Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that since it is a single column max comparison this is probably fine (and no worse than the current implementation). If we need to optimize performance we could probably implement specialized implementations (like FirstValue<ArrowPrimitiveType> and skip the copying entirely.

That is likely a premature optimization at this point

Update: Row format may well be a good idea (not for this PR). I will wait until I have reviewed this code to offer a more informed opinion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.

@alamb
Copy link
Contributor

alamb commented Dec 27, 2023

I plan to review this carefully either later today or tomorrow. I want to get a draft of #8491 first

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed this PR and it looks good to me. @alamb, let us know what you think and if we can improve it

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo and @ozankabak -- this PR looks good to me. ❤️

I believe if we applied the same change to ArrayAgg I think we can remove the limitation of a single compatible ORDER BY in a query -- aka #8582 -- is that your understanding too?

I am sorry for the delay in reviewing, I am partly on holiday this week so don't have as much time to devote to these endeavors as normal.

I think that in many common queries, this implementation is likely faster than what is on main because it doesn't potentially re-sort the entire input (it instead used lexsort_to_indices)

As we discussed in the design document the potential downside of this approach is that if multiple aggregates share the same ORDER BY clause, they will each independently sort the input batches, which is unfortunate but could be optimized in future PTs

};
// Update when there is no entry in the state, or we have an "earlier"
// entry according to sort requirements.
if !self.is_set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, we may be able to use a Option<ScalarValue> instead of ScalarValue and is_set flag, but I don't think it matters for performance and this PR follows the existing implementation as well 👍

// - There is a more recent entry in terms of requirement
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-reviewed and I agree that the RowFormat is not needed here (and in fact it may actually be slower) because, as @mustafasrepo points out, this code uses ScalarValue to compare a single row per batch (it finds the largest/smallest row per batch using lexsort_to_indices). We would have to benchmark to be sure.

}
})
.collect::<Vec<_>>();
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our implementation is (slightly) more efficient, but it is less general (only works for timestamp columns). You can see the basic idea here

https://github.com/influxdata/influxdb/blob/main/query_functions/src/selectors.rs

And the comparision is here: https://github.com/influxdata/influxdb/blob/acfef87659c9a8c4c49e4628264369569e04cad1/query_functions/src/selectors/internal.rs#L119-L127

I think we should stay with the ScalarValue implementation unless we find some query where this calculation is taking most of the time

aggr_expr.as_any().is::<FirstValue>()
|| aggr_expr.as_any().is::<LastValue>()
|| aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
aggr_expr.as_any().is::<OrderSensitiveArrayAgg>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually this would be a nice thing to move into the AggregateExpr trait directly so we could override it and avoid special casing built in functions. Not for this PR though :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea 👍

@@ -100,10 +100,9 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted
--------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do love the lack of Sort here

@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Dec 28, 2023
@ozankabak
Copy link
Contributor

I believe if we applied the same change to ArrayAgg I think we can remove the limitation of a single compatible ORDER BY in a query -- aka #8582 -- is that your understanding too?

Yes, if we make ARRAY_AGG sort internally, we can do this

As we discussed in the design document the potential downside of this approach is that if multiple aggregates share the same ORDER BY clause, they will each independently sort the input batches, which is unfortunate but could be optimized in future PTs

Exactly. We will explore both the split/diamond approach and the approach above in the upcoming weeks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants